/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.spark.scheduler

/**
  * An interface for sort algorithm
  *     用于排序算法的接口
  * FIFO: FIFO algorithm between TaskSetManagers
  *  FIFO:   TaskSetManager 之间的排序
  *
  * FS: FS algorithm between Pools, and FIFO or FS within Pools
  *  FS: 池之间排序, 以及池内的 FIFO 或 FS
  */
private[spark] trait SchedulingAlgorithm {
    def comparator(s1: Schedulable, s2: Schedulable): Boolean
}

private[spark] class FIFOSchedulingAlgorithm extends SchedulingAlgorithm {
    // 是不是先调度 s1
    override def comparator(s1: Schedulable, s2: Schedulable): Boolean = {
        val priority1 = s1.priority
        val priority2 = s2.priority
        var res = math.signum(priority1 - priority2)
        if (res == 0) {
            val stageId1 = s1.stageId
            val stageId2 = s2.stageId
            res = math.signum(stageId1 - stageId2)
        }
        res < 0  // 值小的先调度
    }
}

private[spark] class FairSchedulingAlgorithm extends SchedulingAlgorithm {
    override def comparator(s1: Schedulable, s2: Schedulable): Boolean = {
        val minShare1 = s1.minShare
        val minShare2 = s2.minShare
        val runningTasks1 = s1.runningTasks
        val runningTasks2 = s2.runningTasks
        val s1Needy = runningTasks1 < minShare1
        val s2Needy = runningTasks2 < minShare2
        val minShareRatio1 = runningTasks1.toDouble / math.max(minShare1, 1.0)
        val minShareRatio2 = runningTasks2.toDouble / math.max(minShare2, 1.0)
        val taskToWeightRatio1 = runningTasks1.toDouble / s1.weight.toDouble
        val taskToWeightRatio2 = runningTasks2.toDouble / s2.weight.toDouble

        var compare = 0
        if (s1Needy && !s2Needy) { // 谁的 runningTasks1 < minShare1 谁先被调度
            return true
        } else if (!s1Needy && s2Needy) {
            return false
        } else if (s1Needy && s2Needy) { // 如果都 runningTasks < minShare
            // 则比较 runningTasks / math.max(minShare1, 1.0) 的比值 小的优先级高
            compare = minShareRatio1.compareTo(minShareRatio2)
        } else {
            // 如果都runningTasks > minShare, 则比较 runningTasks / weight 的比值
            // 小的优先级高
            compare = taskToWeightRatio1.compareTo(taskToWeightRatio2)
        }

        if (compare < 0) {
            true
        } else if (compare > 0) {
            false
        } else {
            // 如果前面都一样, 则比较 TaskSetManager 或 Pool 的名字
            s1.name < s2.name
        }
    }
}

